Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to scheduler message #3487

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

lillo42
Copy link
Contributor

@lillo42 lillo42 commented Jan 20, 2025

  • Remove unnecessary code
  • Fixes build
  • Add Unit test
  • Finish implementation
  • Add Sample
  • Implement support to HangFire
  • Implement support to Quartz
  • Implement support to AWS Scheduler

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.02 (8.17 -> 8.15)

  • Declining Code Health: 4 findings(s) 🚩

  • Affected Hotspots: 2 files(s) 🔥

View detailed results in CodeScene

Comment on lines +132 to +133
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All,
IAmAMessageSchedulerFactory? messageSchedulerFactory = null)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 8 to 9 arguments, threshold = 5

Comment on lines +183 to +185
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All,
IAmAMessageSchedulerFactory? messageSchedulerFactory = null)
: this(subscriberRegistry, handlerFactory, requestContextFactory, policyRegistry, featureSwitchRegistry, inboxConfiguration, messageSchedulerFactory: messageSchedulerFactory)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 11 to 12 arguments, threshold = 5

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.02 (8.17 -> 8.15)

  • Declining Code Health: 4 findings(s) 🚩

  • Affected Hotspots: 2 files(s) 🔥

View detailed results in CodeScene

@iancooper iancooper added 2 - In Progress grabbed by assignee feature request .NET Pull requests that update .net code Draft This is a work in progress labels Jan 22, 2025
Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.16 (8.69 -> 8.53)

  • Declining Code Health: 12 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

@@ -37,6 +37,7 @@ THE SOFTWARE. */
using Paramore.Brighter.FeatureSwitch;
using Paramore.Brighter.Logging;
using Paramore.Brighter.Observability;
using Paramore.Brighter.Tasks;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Code Duplication
introduced similar code in: SchedulerPost,SchedulerPost,SchedulerPostAsync,SchedulerPostAsync

Suppress

@@ -37,6 +37,7 @@ THE SOFTWARE. */
using Paramore.Brighter.FeatureSwitch;
using Paramore.Brighter.Logging;
using Paramore.Brighter.Observability;
using Paramore.Brighter.Tasks;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Missing Arguments Abstractions
The average number of function arguments increases from 4.12 to 4.33, threshold = 4.00

Comment on lines +554 to +592
public async Task<string> SchedulerPostAsync<TRequest>(TRequest request,
TimeSpan delay,
RequestContext? requestContext = null,
Dictionary<string, object>? args = null,
bool continueOnCapturedContext = true,
CancellationToken cancellationToken = default) where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory defined.");
}

s_logger.LogInformation("Scheduling a request: {RequestType} {Id}", request.GetType(), request.Id);

var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Scheduler, request, requestContext?.Span, options: _instrumentationOptions);
var context = InitRequestContext(span, requestContext);

try
{
Message message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken);

var scheduler = _messageSchedulerFactory.Create(this);
return scheduler switch
{
IAmAMessageSchedulerAsync async => await async.ScheduleAsync(message, delay, cancellationToken).ConfigureAwait(continueOnCapturedContext),
IAmAMessageSchedulerSync sync => sync.Schedule(message, delay),
_ => throw new InvalidOperationException("Message scheduler must be sync or async")
};
}
catch (Exception e)
{
_tracer?.AddExceptionToSpan(span, [e]);
throw;
}
finally
{
_tracer?.EndSpan(span);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Excess Number of Function Arguments
SchedulerPostAsync has 6 arguments, threshold = 4

Comment on lines +594 to +633
public async Task<string> SchedulerPostAsync<TRequest>(TRequest request,
DateTimeOffset at,
RequestContext? requestContext = null,
Dictionary<string, object>? args = null,
bool continueOnCapturedContext = true,
CancellationToken cancellationToken = default)
where TRequest : class, IRequest
{
if (_messageSchedulerFactory == null)
{
throw new InvalidOperationException("No message scheduler factory defined.");
}

s_logger.LogInformation("Scheduling a request: {RequestType} {Id}", request.GetType(), request.Id);

var span = _tracer?.CreateSpan(CommandProcessorSpanOperation.Scheduler, request, requestContext?.Span, options: _instrumentationOptions);
var context = InitRequestContext(span, requestContext);

try
{
Message message = await s_mediator!.CreateMessageFromRequestAsync(request, context, cancellationToken);

var scheduler = _messageSchedulerFactory.Create(this);
return scheduler switch
{
IAmAMessageSchedulerAsync async => await async.ScheduleAsync(message, at, cancellationToken).ConfigureAwait(continueOnCapturedContext),
IAmAMessageSchedulerSync sync => sync.Schedule(message, at),
_ => throw new InvalidOperationException("Message scheduler must be sync or async")
};
}
catch (Exception e)
{
_tracer?.AddExceptionToSpan(span, [e]);
throw;
}
finally
{
_tracer?.EndSpan(span);
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ New issue: Excess Number of Function Arguments
SchedulerPostAsync has 6 arguments, threshold = 4

Comment on lines +216 to +217
InstrumentationOptions instrumentationOptions = InstrumentationOptions.All,
IAmAMessageSchedulerFactory? messageSchedulerFactory = null)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Constructor Over-Injection
CommandProcessor increases from 8 to 9 arguments, threshold = 5

@@ -144,13 +149,21 @@ public async Task SendWithDelayAsync(Message message, TimeSpan? delay, Cancellat

_pendingConfirmations.TryAdd(await Channel.GetNextPublishSequenceNumberAsync(cancellationToken), message.Id);

if (DelaySupported)
if (delay == TimeSpan.Zero || DelaySupported)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Complex Method
SendWithDelayAsync has a cyclomatic complexity of 10, threshold = 9

Suppress

@@ -144,13 +149,21 @@ public async Task SendWithDelayAsync(Message message, TimeSpan? delay, Cancellat

_pendingConfirmations.TryAdd(await Channel.GetNextPublishSequenceNumberAsync(cancellationToken), message.Id);

if (DelaySupported)
if (delay == TimeSpan.Zero || DelaySupported)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Bumpy Road Ahead
SendWithDelayAsync has 2 blocks with nested conditional logic. Any nesting of 2 or deeper is considered. Threshold is one single, nested block per function

Suppress

/// <returns>Task.</returns>
public async Task SendWithDelayAsync(Message message, TimeSpan? delay, CancellationToken cancellationToken = default)
{
await SendAsync(message, cancellationToken);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ℹ Getting worse: Code Duplication
introduced similar code in: SendWithDelay,SendWithDelayAsync

@@ -42,6 +42,7 @@ public static class BrighterSpanExtensions
CommandProcessorSpanOperation.Send => "send",
CommandProcessorSpanOperation.Clear => "clear",
CommandProcessorSpanOperation.Archive => "archive",
CommandProcessorSpanOperation.Scheduler => "scheduler",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ Getting worse: Complex Method
ToSpanName increases in cyclomatic complexity from 9 to 10, threshold = 9

Suppress

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.16 (8.69 -> 8.53)

  • Declining Code Health: 12 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.15 (8.69 -> 8.54)

  • Declining Code Health: 13 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: -0.15 (8.69 -> 8.54)

  • Declining Code Health: 13 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

Comment on lines +28 to +43
public string Schedule(Message message, TimeSpan delay)
{
var id = getOrCreateSchedulerId(message);
if (s_timers.TryGetValue(id, out var timer))
{
if (onConflict == OnSchedulerConflict.Throw)
{
throw new InvalidOperationException($"scheduler with '{id}' id already exists");
}

timer.Dispose();
}

s_timers[id] = timeProvider.CreateTimer(Execute, (processor, id, message, false), delay, TimeSpan.Zero);
return id;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: Schedule,ScheduleAsync

Suppress

Copy link
Member

@iancooper iancooper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, there is a tremendous amount of work here, that has broken the back of this. So thank you so much. I think my thoughts are generally:

  • Let's overload existing methods to take a DateTimeOffset scheduledAt parameter (and provide a helper method to create a scheduledAt from a TimeSpan that represents a delay) over new method names
  • We have different job types: Request (from CP goes back to send, post or publish, you need to know which) and Message (from a consumer implementation, used to delay posting a message, comes back to the consumer )
  • That makes your scheduler a bit more gnarly; you want an interface that the 3rd party lib support implements that offers either.

But once we have that this will solve a massive problem for us with supporting a Send at a point in time, which will be a really powerful addition in V10

```c#
public interface IAmACommandProcessor
{
string SchedulerSend<TRequest>(TimeSpan delay, TRequest request) where TRequest : class, IRequest;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we would be better off overloading Send with a time, than adding a new name here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, how about SendAt, PublishAt, PostAt etc.

{
if (Scheduler is IAmAMessageSchedulerAsync async)
{
await async.ScheduleAsync(message, delay.Value, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably best to avoid a keyword in a variable name, use asyncScheduler instead?


if (Scheduler is IAmAMessageSchedulerSync sync)
{
sync.Schedule(message, delay.Value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably have to rename to strategy used above

return;
}

if (Scheduler is IAmAMessageSchedulerSync sync)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assume that an async path, always has an async scheduler, and only use the sync scheduler on a sync path (or if transport does not support sync and async, just resort to whichever matches? This allows us to make a blocking decision via the Brighter Sync Context

delay ??= TimeSpan.Zero;
if (delay > TimeSpan.FromMinutes(15))
{
if (Scheduler is IAmAMessageSchedulerAsync async)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comments here on name (async)

}
}

public async Task<string> SchedulerPostAsync<TRequest>(TRequest request,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comments apply. Overload with an At parameter over a new method convention; only one scheduler type


namespace Paramore.Brighter;

public class InMemoryMessageScheduler(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks for making sure we have one of these

}

/// <inheritdoc cref="Dispose"/>
public void Dispose()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to ensure we dispose any remaining timers here?

var message = JsonSerializer.Deserialize<Message>(obj!, JsonSerialisationOptions.Options)!;
if (async)
{
await processor.PostAsync(new FireSchedulerMessage { Id = id, Message = message });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works if our intent is always to send a message, but on a Send or Publish intent is actually just to delay Send or Publish.

delay ??= TimeSpan.Zero;
if (delay != TimeSpan.Zero)
{
if (Scheduler is IAmAMessageSchedulerAsync async)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usual comments on name and pick one

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)

  • Declining Code Health: 14 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

@@ -0,0 +1,402 @@
using System.Collections.Concurrent;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Primitive Obsession
In this module, 31.7% of all function arguments are primitive types, threshold = 30.0%

Suppress

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)

  • Declining Code Health: 14 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

Comment on lines +18 to +35
public string Schedule(Message message, DateTimeOffset at)
{
var id = getOrCreateSchedulerId(message);
var job = JobBuilder.Create<QuartzBrighterJob>()
.WithIdentity(getOrCreateSchedulerId(message), group!)
.UsingJobData("message", JsonSerializer.Serialize(
new FireSchedulerMessage { Id = id, Async = false, Message = message },
JsonSerialisationOptions.Options))
.Build();

var trigger = TriggerBuilder.Create()
.WithIdentity(getOrCreateSchedulerId(message) + "-trigger", group!)
.StartAt(at)
.Build();

var tmp = BrighterAsyncContext.Run(async () => await scheduler.ScheduleJob(job, trigger));
return id;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ New issue: Code Duplication
The module contains 2 functions with similar structure: Schedule,ScheduleAsync

Suppress

Copy link

@codescene-delta-analysis codescene-delta-analysis bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Health Quality Gates: FAILED

Change in average Code Health of affected files: +0.01 (8.74 -> 8.75)

  • Declining Code Health: 14 findings(s) 🚩
  • Improving Code Health: 2 findings(s) ✅
  • Affected Hotspots: 7 files(s) 🔥

View detailed results in CodeScene

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2 - In Progress Draft This is a work in progress feature request grabbed by assignee .NET Pull requests that update .net code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants